package com.mimo.sdk;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.Gson;
import com.mimo.common.utils.RandomStringUtils;
import com.mimo.sdk.session.CommonConstantConfig;
import com.mimo.sdk.session.ConnectionStatus;
import com.mimo.sdk.session.ICallback;

import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;

public class MimoClient {
  public static final Logger log = LoggerFactory.getLogger(MimoClient.class);

  public static final Gson gson = new Gson();

  private static final OkHttpClient okHttpClient = new OkHttpClient.Builder().connectTimeout(10L, TimeUnit.SECONDS)
      .readTimeout(10L, TimeUnit.SECONDS).writeTimeout(10L, TimeUnit.SECONDS).build();

  private MimoSocketListener listener = new MimoSocketListener();

  private String origionUrl;
  private String url;
  private String user;
  private String token;
  private String deviceId;

  private Map<String, Object> idToDataOnPost;

  private ICallback callback;

  private WebSocket ws;

  private volatile long lastActiveTime;

  private volatile ConnectionStatus status;

  // 心跳器
  private Timer heartbeatTimer;

  // 发送消息列队检测器
  private Timer msgCheckTimer;

  // deivceId这里，是用于区分设备的，如果同一账户+同一设备信息，则系统会认为这是一个正常的重连操作。
  // 如果账户相同，则设备信息与服务端当前维护的设备信息不一致，则认为是同一账户在多个设备之间的抢登
  public MimoClient(String url, String user, String token, String deviceId, ICallback callback) {

    this.origionUrl = url;
    this.url = url;
    this.user = user;
    this.token = token;
    this.deviceId = deviceId;
    this.callback = callback;

    // 构造请示的目标URL
    this.url += "?uid=" + user;
    this.url += "&token=" + token;
    this.url += "&deviceId=" + this.deviceId;
    this.url += "&terminal=WEB";
    this.url += "&version=1.0.0";

    // 用于存储发送的数据, msgId -> data 的内容
    this.idToDataOnPost = new ConcurrentHashMap<>();

    // 用于更新最近一次活跃时间
    this.lastActiveTime = 0;

    this.status = ConnectionStatus.None;

    log.info("URL -> {}", this.url);
  }

  // 手动执行登陆动作
  public void connect() {
    if (Objects.isNull(this.url)) {
      throw new IllegalArgumentException("实例对象没有初始化");
    }

    // 为防止重复连接,如果原生的实例还在线，则做关闭清洗
    if (Objects.nonNull(this.ws)) {
      log.info("强制关闭 ws 连接");
      this.ws.cancel();
      this.ws = null;
    }

    this.init();
  }

  public void sendP2PMessage(String targetPeer, String content) {
    Map<String, Object> p = new HashMap<>();
    p.put("id", RandomStringUtils.uniqueRandom());
    p.put("type", "P2P");
    p.put("target", targetPeer);
    p.put("content", content);
    p.put("ts", System.currentTimeMillis());
    this.send(p);
  }

  // 发送房间消息
  public void sendRoomMessage(String roomId, String content) {
    Map<String, Object> r = new HashMap<>();
    r.put("id", RandomStringUtils.uniqueRandom());
    r.put("type", "P2Room");
    r.put("target", roomId);
    r.put("content", content);
    r.put("ts", System.currentTimeMillis());
    this.send(r);
  }

  public void joinRoom(String roomId) {
    Map<String, Object> join = new HashMap<>();
    join.put("id", RandomStringUtils.uniqueRandom());
    join.put("type", "JoinRoom");
    join.put("target", roomId);
    join.put("ts", System.currentTimeMillis());
    this.send(join);
  }

  public void leaveRoom(String roomId) {
    Map<String, Object> leave = new HashMap<>();
    leave.put("id", RandomStringUtils.uniqueRandom());
    leave.put("type", "LeaveRoom");
    leave.put("target", roomId);
    leave.put("ts", System.currentTimeMillis());
    this.send(leave);
  }

  private void send(Map<String, Object> data) {
    if (Objects.isNull(this.ws)) {
      log.error("客户端已登出，请重新登录");
      return;
    }

    if (!Objects.equals(data.get("type"), "HeartBeat")) {
      if (!data.containsKey("ts")) {
        data.put("ts", System.currentTimeMillis());
      }
      if (data.containsKey("id")) {
        this.idToDataOnPost.put((String) data.get("id"), data);
      }
    }

    if (this.checkWebSocketStatusBeforeSend()) {
      this.ws.send(gson.toJson(data));
    } else {
      log.info("Current status Abnormal,try to reconnect...");
      this.init();
    }
  }

  private synchronized void init() {

    if (this.status == ConnectionStatus.Connecting) { // 如果已经在连接中，则直接返回
      log.info("已经在重连中了");
      return;
    }

    if (this.status == ConnectionStatus.Connected) { // 如果已经连接成功了，则直接返回
      log.info("已经连接成功了");
      return;
    }

    if (this.status == ConnectionStatus.Terminate) { // 如果连接过程已经结束,则直接返回
      log.info("连接过程已经结束");
      return;
    }

    // 重置一切
    this.lastActiveTime = this.lastActiveTime > 0 ? this.lastActiveTime : 0;
    this.status = ConnectionStatus.Connecting;

    if (Objects.nonNull(this.ws)) { // 强制关闭此前的通道
      this.ws.cancel();
    }

    if (Objects.nonNull(callback)) {// 回调连接状态
      callback.onConnecting();
    }

    Request request = new Request.Builder().url(this.url).build();
    okHttpClient.newWebSocket(request, listener);
  }

  public boolean checkWebSocketStatusBeforeSend() {
    boolean allow = false;
    if (Objects.isNull(ws) || status == ConnectionStatus.None) {
      log.error("连接尚未初始化");
      return allow;
    }

    allow = Objects.equals(status, ConnectionStatus.Connected);
    return allow;
  }

  public void logout(boolean forceLogout) {
    // 停掉定时器
    log.info("下线停掉心跳定时器");
    this.heartbeatTimer.cancel();
    this.heartbeatTimer = null;

    log.info("下线停掉过期消息定时器");
    this.msgCheckTimer.cancel();
    this.msgCheckTimer = null;

    // 标记当前客户端的终止状态
    log.info("当前状态标记终止");
    this.status = ConnectionStatus.Terminate;

    if (!forceLogout) { // 如果是非强制退登，则主动发送退登指令
      log.info("主动发送退登指令");
      Map<String, Object> lg = new HashMap<>();
      lg.put("id", RandomStringUtils.uniqueRandom());
      lg.put("type", "Logout");
      this.send(lg);
    }

    // 清空待发送的队列
    this.idToDataOnPost.clear();

    // 关闭连接
    if (Objects.nonNull(this.ws)) {
      log.info("强制关闭 ws 连接");
      this.ws.cancel();
      this.ws = null;
    }
  }

  private boolean isTerminated() {
    return status == ConnectionStatus.Terminate;
  }

  // ---------------------------------------------------------------------------------
  private class MimoSocketListener extends WebSocketListener {

    @Override
    public void onOpen(WebSocket webSocket, Response response) {
      super.onOpen(webSocket, response);
      // 本地化赋值
      ws = webSocket;

      lastActiveTime = System.currentTimeMillis();
      status = ConnectionStatus.Connected;
      callback.onConnected();

      // 心跳定时器,5秒一次
      if (Objects.isNull(heartbeatTimer)) {
        heartbeatTimer = new Timer("心跳定时器");
        heartbeatTimer.scheduleAtFixedRate(new TimerTask() {
          public void run() {
            if (Objects.nonNull(ws) && status == ConnectionStatus.Connected) {
              Map<String, Object> heartBeat = new HashMap<>();
              heartBeat.put("id", RandomStringUtils.uniqueRandom());
              heartBeat.put("type", "HeartBeat");
              send(heartBeat);
            }

            long now = System.currentTimeMillis();
            if (status != ConnectionStatus.Terminate
                && Math.abs(now - lastActiveTime) > CommonConstantConfig.MIMO_EXPIRE_IN_MILL) {
              log.warn("心跳超时，启动重连");
              status = ConnectionStatus.Disconnected;
              init();
            }
          }
        }, CommonConstantConfig.MIMO_HEARTBEAT_TIMING, CommonConstantConfig.MIMO_HEARTBEAT_TIMING);
      }

      if (Objects.nonNull(msgCheckTimer)) {
        msgCheckTimer = new Timer("过期消息检查器");
        msgCheckTimer.scheduleAtFixedRate(new TimerTask() {

          public void run() {
            List<Entry<String, Object>> es = idToDataOnPost.entrySet().stream().filter(e -> {
              long now = System.currentTimeMillis();
              Map<String, Object> value = (Map<String, Object>) e.getValue();
              return status != ConnectionStatus.Terminate
                  && Math.abs(now - (long) value.get("ts")) > CommonConstantConfig.MSG_DROP_EXPIRE_IN_MILL;
            }).collect(Collectors.toList());
            es.forEach(e -> {
              Map<String, Object> value = (Map<String, Object>) idToDataOnPost.remove(e.getKey());
              log.warn("过期消息：{}", gson.toJson(value));
            });
          }
        }, CommonConstantConfig.MIMO_MSG_CHECKING_TIMING, CommonConstantConfig.MIMO_MSG_CHECKING_TIMING);

      }
    }

    @Override
    public void onMessage(WebSocket webSocket, String text) {
      super.onMessage(webSocket, text);

      // 如果业务上已经调用了中止符，此时，即使管道中还残留相关的未刷出消息，也将被直接抛弃，不做任何处理
      if (isTerminated()) {
        log.warn("在管道关闭后,管道中还残留相关消息:{}", text);
        return;
      }

      /// -----------------------
      lastActiveTime = System.currentTimeMillis();
      status = ConnectionStatus.Connected;
      Map<String, Object> msg = gson.fromJson(text, Map.class);
      String msgType = (String) msg.get("type");

      if (msg.containsKey("id")) {
        /// -------------------------------------
        if (Objects.equals(msgType, "Ack") && msg.containsKey("target")) {
          Map<String, Object> queuedCmd = (Map<String, Object>) idToDataOnPost.remove(msg.get("target"));
          if (Objects.nonNull(queuedCmd)) {// 检查本地消息队列，并清除已经完成的消息
            String queuedCmdType = (String) queuedCmd.get("type");
            String roomId = (String) queuedCmd.get("target");
            if (Objects.equals(queuedCmdType, "JoinRoom")) {
              log.info("加入房间{}成功", roomId);
              callback.onJoinRoomSuccess(roomId);
            }
          } else if (Objects.nonNull(queuedCmd) && Objects.equals(queuedCmd.get("type"), "LeaveRoom")) {
            String roomId = (String) queuedCmd.get("target");
            log.info("离开房间{}成功", roomId);
            callback.onLeaveRoomSuccess(roomId);
          }
        }
      }
      /// -------------------------------------------------

      if (!Objects.equals(msgType, "Ack")) { // 除了房间消息，统一做确认
        if (!Objects.equals(msgType, "Room2Peer")) {
          Map<String, Object> ack = new HashMap<>();
          ack.put("target", msg.get("id"));
          ack.put("id", RandomStringUtils.uniqueRandom());
          ack.put("type", "Ack");
          ws.send(gson.toJson(ack));
        }

        if (Objects.equals(msgType, "P2P")) {// 如果有上层注册了P2P消息监听,则触发回调
          callback.onP2PMessage((String) msg.get("from"), (String) msg.get("content"));
        } else if (Objects.equals(msgType, "Room2Peer")) {
          callback.onRoomMessage((String) msg.get("roomId"), (String) msg.get("from"), (String) msg.get("content"));
        }
      }
      /// -------------------------------------------------
      if (Objects.equals(msgType, "Error")) {
        // 错误类型则需要拿回原始的数据，以备业务后续使用
        Map<String, Object> queuedErrorCmd = (Map<String, Object>) idToDataOnPost.remove(msg.get("id"));
        if (Objects.nonNull(queuedErrorCmd)) {
          String queuedErrorCmdType = String.class.cast(queuedErrorCmd.get("type"));
          String roomId = String.class.cast(queuedErrorCmd.get("target"));
          if (Objects.equals(queuedErrorCmdType, "JoinRoom")) {
            log.warn("加房间{}失败回调", roomId);
            callback.onJoinRoomFailed(roomId);
          } else if (Objects.equals(queuedErrorCmdType, "LeaveRoom")) {
            log.warn("离开房间{}失败回调", roomId);
            callback.onLeaveRoomFailed(roomId);
          }
        }
        // ------------
        if (CommonConstantConfig.terminates.containsKey(msg.get("code"))) { // 如果遇到强制中止编码
          log.warn("强制中止编码:{}  中止信息:{}", msg.get("code"), CommonConstantConfig.terminates.get(msg.get("code")));
          logout(true);
        }
        callback.onErrorMessage(((Number) msg.get("code")).intValue(), (String) msg.get("msg"));
      }
    }

    @Override
    public void onClosed(WebSocket webSocket, int code, String reason) {
      super.onClosed(webSocket, code, reason);
      log.warn("closed");
      status = ConnectionStatus.Disconnected;
      callback.onComplete(null);
    }

    @Override
    public void onFailure(WebSocket webSocket, Throwable t, Response response) {
      super.onFailure(webSocket, t, response);
      log.warn("Failure:{},resp:{}", t, response);
      status = ConnectionStatus.Disconnected;
      callback.onComplete(t);
    }
  }
  // ---------------------------------------------------------------------------------
}
